終於要開始遊戲了,在此之前我們還需要將連線和房間、玩家資料做綁定,這樣遊戲伺服器不會搞錯資料發送的對象。前面會講解概念的部分,也會附上目前實作,從開房間到開始遊戲這部分是可以運作的,我們之後會再加上遊戲運作的部分。
在 python websockets 套件中並不像 socketio 那樣有內建房間等概念,所以我們需要自己維護房間的資訊。
在遊戲開始之後,所有遊戲相關的操作都還是透過 websocket 的連線物件 (後面都以 conn 代稱) 讀取,但遊戲伺服器其實不知道這個連線對應了哪個房間?哪位玩家?所以在遊戲之前我們需要額外建立一個「連線對應(房間&玩家)」的對照表,這樣子遊戲伺服器收到封包資料的時候,它就可以查詢這個連線對應的房號和玩家編號。
那我們要用什麼來識別連線物件呢?還好 websockets
套件的連線物件自己本身都自帶一個 id (uuid v4) 這樣子我們就可以安心地拿來使用了
room_conns = {} # 定義對照表
room_coons[room_id] = [conn1, conn2] # 以 room_id 作為 key, 對應到房間內的所有 conn
player_conns = {} # 定義對照表
conns = [room_id] # 取得房間內所有的 conn
player_conns[conns[0].id] = (room_id, 1) # 以 conn_id 作為 key,對應 (room_id, player_id)
player_conns[conns[1].id] = (room_id, 2) # 第二位玩家的對照,同上
game.py
import random
def init_game():
# 初始玩家生命值
initial_health = 15
# 初始卡牌堆
player1_deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
player2_deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
player1_hand = [player1_deck.pop(random.randint(0, len(player1_deck) - 1))]
player2_hand = [player2_deck.pop(random.randint(0, len(player2_deck) - 1))]
# 遊戲狀態字典
game_state = {
"player1_health": initial_health,
"player2_health": initial_health,
"player1_deck": player1_deck,
"player2_deck": player2_deck,
"player1_hand": player1_hand,
"player2_hand": player2_hand,
"current_player": 1, # 1代表玩家1,2代表玩家2
"winner": None # 初始沒有獲勝者
}
return game_state
def draw_card(game_state):
current_player = game_state["current_player"]
deck = game_state[f"player{current_player}_deck"]
hand = game_state[f"player{current_player}_hand"]
if len(deck) > 0:
# 從卡牌堆中隨機抽一張卡牌
card_index = random.randint(0, len(deck) - 1)
card_id = deck.pop(card_index)
# 把卡牌加入手牌
hand.append(card_id)
print(f'玩家{current_player} 抽出 {card_id} 並加入手牌')
# 更新遊戲狀態
updated_state = {
f"player{current_player}_deck": deck,
f"player{current_player}_hand": hand,
}
return updated_state
# TODO 事前排除 card_id 不在玩家手牌中的案例
def play_card(game_state, card_id):
current_player = game_state["current_player"]
other_player = 2 if current_player == 1 else 1 # 對手
hand = game_state[f"player{current_player}_hand"]
other_player_health = game_state[f"player{other_player}_health"]
if card_id in hand:
# 找到卡牌並計算傷害值(這裡可以根據卡牌的屬性進行計算)
damage = card_id
# 更新對手的生命值
other_player_health -= damage
print(f'玩家{current_player}使用{card_id}造成傷害,玩家{other_player}生命剩下{other_player_health}點')
if other_player_health <= 0:
# 如果對手生命值小於等於0,遊戲結束,當前玩家獲勝
game_state["winner"] = current_player
print(f'遊戲結束,贏家為{current_player}')
# 移除已經使用的卡牌
hand.remove(card_id)
else:
print(f'WARN: {card_id} 不在玩家手牌中')
# 更新遊戲狀態
updated_state = {
f"player{other_player}_health": other_player_health,
f"player{current_player}_hand": hand,
"current_player": other_player, # 切換回合
}
return updated_state
# NOTE: 自動遊玩,測試用
def autoplay_card(game_state):
current_player = game_state["current_player"]
other_player = 2 if current_player == 1 else 1 # 對手
hand = game_state[f"player{current_player}_hand"]
other_player_health = game_state[f"player{other_player}_health"]
if len(hand) == 0:
print(f'ERRO: 玩家手牌為空')
return None
card_id = hand.pop() # 取出手牌最後一張
# 找到卡牌並計算傷害值(這裡可以根據卡牌的屬性進行計算)
damage = card_id
# 更新對手的生命值
other_player_health -= damage
print(f'玩家{current_player}使用{card_id}造成傷害,玩家{other_player}生命剩下{other_player_health}點')
if other_player_health <= 0:
# 如果對手生命值小於等於0,遊戲結束,當前玩家獲勝
game_state["winner"] = current_player
print(f'遊戲結束,贏家為{current_player}')
# 更新遊戲狀態
updated_state = {
f"player{other_player}_health": other_player_health,
f"player{current_player}_hand": hand,
"current_player": other_player, # 切換回合
}
return updated_state
def is_game_over(game_state):
return game_state["winner"] != None
# 將遊戲狀態以玩家視角輸出,過濾掉非公開資訊
def game_state_view_as(game_state, player_id):
other_player = 2 if player_id == 1 else 1 # 對手
game_data = {
"playerHealth": game_state[f"player{player_id}_health"],
"playerHand": game_state[f"player{player_id}_hand"],
"opponentHealth": game_state[f"player{other_player}_health"],
"winner": game_state["winner"]
}
return game_data
if __name__ == "__main__":
game_state = init_game() # 初始化遊戲
turn = 1
print(game_state)
while not is_game_over(game_state):
current_player = game_state['current_player']
print(f'第{turn}回合,輪到玩家{current_player},請按下 enter 表示玩家抽牌', end='')
_ = input() # 模擬玩家行動
updated_state = draw_card(game_state)
game_state.update(updated_state)
print(f"玩家{current_player}的手牌:", game_state[f"player{current_player}_hand"])
# updated_state = play_card(game_state, game_state[f"player{current_player}_hand"][0])
updated_state = autoplay_card(game_state) # 純自動模式
game_state.update(updated_state)
print("玩家1的生命值:", game_state["player1_health"])
print("玩家2的生命值:", game_state["player2_health"])
turn += 1
# print(game_state)
room.py
import asyncio
import websockets
import json
from game import init_game, draw_card, autoplay_card, game_state_view_as, is_game_over
server_host, server_port = 'localhost', 8765
# 存儲房間和玩家的資訊
player_conns = {} # conn_id => (room_id, player_id)
room_conns = {} # room_id => [conn...]
# NOTE: if conn_id in player_conns, then conn is gamming
games = {} # room_id => game
# 發送訊息給指定對象
async def notify(conn, payload):
print(f'DEBUG: notify conn#{conn.id}', payload)
await conn.send(payload)
# print(f'DEBUG: _notify to conn#{conn.id} done')
# 發送訊息給群體
async def notify_all(conns, payload):
for conn in conns:
await notify(conn, payload)
# 通知玩家行動/等待
async def send_turn_notify(room_id):
print(f'DEBUG: send_turn_notify room#{room_id}')
game_state = games[room_id]
conns = room_conns[room_id]
current_player_id = game_state["current_player"]
other_player_id = 2 if current_player_id == 1 else 1 # 對手
current_player_conn = conns[current_player_id]
other_player_conn = conns[other_player_id]
payload1 = json.dumps({"message": "turn start"})
await notify(current_player_conn, payload1)
payload2 = json.dumps({"message": "wait"})
await notify(other_player_conn, payload2)
# 遊戲開始
async def game_start(room_id):
print(f'DEBUG: game_start room#{room_id}')
conns = room_conns[room_id]
player_conns[conns[0].id] = (room_id, 1)
player_conns[conns[1].id] = (room_id, 2)
games[room_id] = init_game()
payload = json.dumps({"message": "game start"})
await notify_all(conns, payload)
room_id
game_state_view_as1 = game_state_view_as(games[room_id], 1)
state1 = json.dumps({"gameEvent": "setState", "gameData": game_state_view_as1})
await notify(conns[0], state1)
game_state_view_as2 = game_state_view_as(games[room_id], 2)
state2 = json.dumps({"gameEvent": "setState", "gameData": game_state_view_as2})
await notify(conns[1], state2)
# await send_turn_notify(room_id)
# 定義 WebSocket 伺服器的處理邏輯
async def game_server(conn, path):
print(f'DEBUG: create conn#{conn.id}')
async for message in conn:
data = json.loads(message)
action = data.get("action")
payload = data.get("payload")
print('message', conn.id, action, payload)
if action:
if action == "create_room":
print('DEBUG: room_conns', room_conns)
# 創建房間
room_id = len(room_conns) + 1
room_conns[room_id] = [conn]
print('create room', room_conns[room_id])
response = {"message": f"Room created with ID {room_id}"}
await conn.send(json.dumps(response))
elif action == "join_room":
print('DEBUG: room_conns', room_conns)
# 加入房間
room_id = payload.get("room_id")
if room_id in room_conns and len(room_conns[room_id]) < 2:
room_conns[room_id].append(conn)
print('join room', room_conns[room_id])
response = {"message": f"Joined room {room_id}"}
await conn.send(json.dumps(response))
else:
response = {"error": "Room is full or does not exist"}
await conn.send(json.dumps(response))
elif action == "start_game":
# 開始遊戲
room_id = payload.get("room_id")
if room_id in room_conns and len(room_conns[room_id]) == 2:
response = {"message": "Starting the game..."}
await conn.send(json.dumps(response))
# 在這裡可以加入遊戲的邏輯
await game_start(room_id)
else:
response = {"error": "Invalid room or not enough players"}
await conn.send(json.dumps(response))
print(f'DEBUG: conn#{conn.id} is disconnected')
# 啟動 WebSocket 伺服器
async def main():
start_server = await websockets.serve(game_server, server_host, server_port)
print(f"WebSocket server started on ws://{server_host}:{server_port}")
await start_server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
room.html
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Game Client</title>
</head>
<body>
<h1>WebSocket Game</h1>
<div id="messages"></div>
<button onclick="sendMessage('create_room')">Create Room</button>
<button onclick="joinRoom()">Join Room</button>
<button onclick="startGame()">Start Game</button>
<script>
var ws;
var messages = document.getElementById("messages");
function showMessage(message) {
var p = document.createElement("p");
p.textContent = message;
messages.appendChild(p);
}
function sendMessage(action, payload = {}) {
if (ws && ws.readyState === WebSocket.OPEN) {
var message = JSON.stringify({ action: action, payload: payload });
ws.send(message);
} else {
showMessage("WebSocket not connected.");
}
}
function joinRoom() {
var room_id = parseInt(prompt("Enter room ID:"));
if (isNaN(room_id)) {
alert('room_id is not number')
return
}
sendMessage('join_room', { room_id: room_id });
}
function startGame() {
var room_id = parseInt(prompt("Enter room ID to start the game:"));
if (isNaN(room_id)) {
alert('room_id is not number')
return
}
sendMessage('start_game', { room_id: room_id });
}
ws = new WebSocket("ws://localhost:8765");
ws.onmessage = function(event) {
var data = JSON.parse(event.data);
if (data.message || data.error) {
showMessage(data.message || data.error);
}
if (data.gameEvent) {
console.log('gameEvent', data)
}
};
</script>
</body>
</html>
遊戲部分已經在 D23 的時候封裝成 module 了,但後來還想更進一步簡化操作的部分,所以多做了一個 autoplay_card() 這樣子玩家在呼叫這個 API 的時候就不用帶 card_id 參數,測試的時候會更簡單
連線綁定的部分花了蠻多時間思考的,其實最大問題就是到底核心是「websocket」還是「game」的議題。後來其實沒有什麼結論,websockets 套件底下是以「連線」為主體去思考問題,如果要處理跟遊戲有關的功能就會需要相關的對照表。而遊戲伺服器那邊需要發送更新通知,它會需要對應的「連線」才能把資料送給對的玩家,所以兩邊都是互相依賴的狀況。目前是以「能動就好」的方式下去開發,各位看官可以根據自己的需求再微調程式架構
上述範例程式碼安置到 github 或其他環境